package cn.movie;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Demo02_Movie_ImportToHBaseMR2 extends Configured implements Tool{
	@Override
	public int run(String[] args) throws Exception {
		// 输入
		if (args.length != 2) {
			System.out.println("usage :in");
			return -1;
		}
		
		Configuration config = HBaseConfiguration.create();
//		config.set("fs.defaultFS", "hdfs://hadoop31:8020");
		config.set("dfs.stream-buffer-size", "134217728"); //设置缓冲区大小，可以不定义
		FileSystem fs = FileSystem.get(config);
		Path path = new Path(args[1]);
		System.out.println(path);
		if (fs.exists(path)) {
			fs.delete(path, true);
		}
		//
		Job job = Job.getInstance(config,"Movie");
		job.setJarByClass(getClass());
		//
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Movie.class);
		job.setNumReduceTasks(0);//不需要Reducer
		//
		FileInputFormat.addInputPath(job, new Path(args[0]));// 设置输入的源
		FileOutputFormat.setOutputPath(job, path);
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		int code = ToolRunner.run(new Demo02_Movie_ImportToHBaseMR2(), args);
		System.exit(code);
	}

	public static class MyMapper extends Mapper<LongWritable, Text, Text, Movie> {
		Text text = new Text();
		private Movie movie = null ;
		//
		private Connection con = null;
		private PreparedStatement ps = null;
		
		@Override
		protected void setup(Mapper<LongWritable, Text, Text, Movie>.Context context)
				throws IOException, InterruptedException {
			try {
				Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");//添加驱动
				String url="jdbc:phoenix:hadoop31:2181";
				con = DriverManager.getConnection(url);
				System.out.println("con:"+con);
				if (con!=null) {
					ps = con.prepareStatement("upsert into movie(MID,ID,NAME,TYPE,SCORE,RATER,URL) values(?,?,?,?,?,?,?)");
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Movie>.Context context)
				throws IOException, InterruptedException {
			String uu = UUID.randomUUID().toString();
			uu = uu.replace("-", "");
			String[] strs = value.toString().split("\t");
			if (strs[4].equals("")) {
				strs[4]="0";
			}
			
			//向输出路径中写入数据：
			System.out.println("str:"+strs);
			text.set(uu);
			System.out.println("uu:"+uu);
			movie=new Movie(strs[1],strs[2],strs[0],strs[3],strs[4],strs[5]);
			context.write(text, movie);
			
			//向hbase里写入数据：
			try {
				ps.setString(1, uu);
				ps.setString(2, strs[1]);
				ps.setString(3, strs[2]);
				ps.setString(4, strs[0]);
				ps.setDouble(5, Double.parseDouble(strs[3]));
				ps.setInt(6, Integer.parseInt(strs[4]));
				ps.setString(7, strs[5]);
				int row = ps.executeUpdate();
				System.out.println("写入行数："+row);
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
		
		@Override //关闭连接！！！！！
		protected void cleanup(Mapper<LongWritable, Text, Text, Movie>.Context context)
				throws IOException, InterruptedException {
			if (con!=null) {
				System.out.println("关闭连接："+con);
				try {
					con.commit();   //phoenix默认是关闭自动提交的（事务处于关闭状态），所以必须要commit()!!!!!!!
					ps.close();
					con.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
	}
		
	public static class Movie{
		private String id;
		private String name;
		private String type;
		private String score;
		private String rater;
		private String url;		
		
		@Override
		public String toString() {
			return id + "," + name + "," + type + "," + score + "," + rater
					+ "," + url;
		}
		public Movie() {

		}
		public Movie(String id, String name, String type, String score, String rater, String url) {
			super();
			this.id = id;
			this.name = name;
			this.type = type;
			this.score = score;
			this.rater = rater;
			this.url = url;
		}
		public String getId() {
			return id;
		}
		public void setId(String id) {
			this.id = id;
		}
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String getType() {
			return type;
		}
		public void setType(String type) {
			this.type = type;
		}
		public String getScore() {
			return score;
		}
		public void setScore(String score) {
			this.score = score;
		}
		public String getRater() {
			return rater;
		}
		public void setRater(String rater) {
			this.rater = rater;
		}
		public String getUrl() {
			return url;
		}
		public void setUrl(String url) {
			this.url = url;
		}	
		
	}	
}



